[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467
[WIP] perf(core): Add a new mutation pipeline for per predicate runners#9467
Conversation
e4e2caf to
f0a5928
Compare
|
d233f29 to
e8a07d6
Compare
b5c3b4c to
978a0d4
Compare
Mechanical fixes after merging current main: - Update stale `hypermodeinc/dgraph` import paths to `dgraph-io/dgraph` in posting/lists.go and worker/sort_test.go. - Fix posting/mvcc_test.go TestRegression9597 (added on main) — the branch refactored LocalCache.deltas from a map to a *Deltas wrapping a sharded map, so direct map indexing no longer compiles. Use Deltas.AddToDeltas() instead. - Bring types/locked_sharded_map.go license header into the project's standard format. - Doc fix in TESTING.md (stray hypermodeinc reference). No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Removes the unconditional debug prints scattered through posting/ during the original WIP work — they fired on every read, mutation, rollup, and commit. None of them were guarded by a verbosity flag, so under load they would have produced megabytes of stdout noise per second. Sites stripped: - posting/lists.go: READING / READING SINGLE / GETTING KEY FROM DELTAS - posting/index.go: TOKENS, LOCAL MAP, INSERTING INDEX, UPDATE INDEX, ERRORRRING, "Inserting tokenizer indexes ... took" - posting/mvcc.go: COMMITTING (and unused fmt import) - posting/list.go: "Buidlding committed uids", "Setting mutation", PrintRollup helper (called once internally, never elsewhere) Left in place: printTreeStats() in index.go, which is already gated by the DEBUG_SHOW_HNSW_TREE env var and is an intentional opt-in HNSW debug helper. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Two large dead-code blocks left over from the original WIP: - posting/index.go: ~175 lines of an alternate InsertTokenizerIndexes implementation, fully commented out. The live implementation directly above it supersedes it; keeping the commented variant just made the file harder to follow. Also drop the scattered "//fmt.Println(...)" leftovers next to live code. - worker/draft_test.go: BenchmarkProcessListIndex was added entirely commented out and references methods (DefaultPipeline, ProcessListWithoutIndex, ProcessListIndex) that don't exist on MutationPipeline. If we want a benchmark for the pipeline, we should write one against the real API. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Replace the hardcoded \`featureFlag := true\` in applyMutations with a real superflag knob, defaulted off: - Add WorkerOptions.MutationsUsePipeline (bool) in x/config.go. - Extend the feature-flags superflag with mutations-use-pipeline=false and wire alpha to populate WorkerConfig.MutationsUsePipeline from it. - worker/draft.go applyMutations now branches on x.WorkerConfig.MutationsUsePipeline; default false routes mutations through the legacy path, preserving current behavior. Tests can opt into the new pipeline by setting x.WorkerConfig.MutationsUsePipeline = true. CLI usage: dgraph alpha --feature-flags="mutations-use-pipeline=true" Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
TestCount is t.Skip()'d on the branch, but the reason wasn't recorded. Investigation: the test launches concurrent transactions sharing entity uids and bypasses the Oracle's conflict-checking commit path — it just calls CommitToDisk() directly with disjoint commit timestamps. Both the legacy AddMutationWithIndex path and the new mutation pipeline fail it identically: with two threads adding edges to the same subject's [uid] @count predicate, neither path can serialize @count updates without real txn conflicts, so the count index ends up inconsistent and many subjects are missing from count(N). This is expected without conflict checking — the unit harness can't exercise the safety the Oracle provides. Re-enable when we wire either: (a) Oracle.WaitForTs/IsAborted into the harness, or (b) this test through worker.applyMutations() (which does invoke the Oracle conflict path). Single-thread TestCount passes, so the per-predicate pipeline's own count logic is correct in the absence of contention. The existing TestStringIndexWithLang covers the multithreaded happy path with disjoint uids. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
…ssCount Bug: scalar @count writes were nondeterministically losing data under concurrent transactions. Roughly half the deltas committed by the new mutation pipeline contained only a [DeleteAll] posting and no Set, so reads at maxTs returned an empty value list. Root cause: in ProcessSingle, handleOldDeleteForSingle appends a synthetic Del-of-old-value to postings[uid] alongside the user's Set, so InsertTokenizerIndexes / ProcessReverse / count diffing can see the prior value. ProcessCount then iterates the postings and calls list.updateMutationLayer(post, singleUidUpdate=true, ...) on each. For non-Lang scalar predicates fingerprintEdge returns math.MaxUint64, so the synthetic Del and the user Set both have Uid == math.MaxUint64. The first iteration (Set new) leaves mutationMap.currentEntries = [DeleteAll, Set new]; the second iteration (Del old) finds the Set we just inserted via findPosting and applies updateMutationLayer in singleUidUpdate mode, which unconditionally rewrites currentEntries to [DeleteAll] (the Del branch never appends mpost) — wiping the new value. Fix: in ProcessCount, when iterating a !isListEdge predicate's postings, if the list contains a Set/Ovr posting, treat any Del as synthetic and skip it for the data-list update. Standalone user Dels (no accompanying Set) are still applied. Index/reverse/count diffing already happen before ProcessCount runs and aren't affected. Repro: TestPipelineCountIndexConcurrent in worker/sort_test.go is a new conflict-aware in-process harness that mirrors the systest TestCountIndexConcurrentSetDelScalarPredicate. It runs 200 contending transactions setting <0x1> <name> "name<rand>" against a "string @index(exact) @count" schema with a fakeOracle that implements the same hasConflict algorithm as dgraph/cmd/zero/oracle.go. Pre-fix the test fails roughly 50% of runs with an empty data list and the wrong count buckets; post-fix it is stable across 20+ -count iterations and under -race. Existing tests (TestScalarPredicateIntCount, *RevCount, *Count, TestSingleUidReplacement, TestDeleteSetWithVarEdgeCorruptsData, TestStringIndexWithLang, TestMultipleTxnListCount, TestGetScalarList, TestDatetime) all pass. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
When loading via dgraph live (or any mutation source whose uids span
the full uint64 range, including xidmap-assigned uids), the
per-predicate pipeline hung indefinitely on the very first batch with
zero forward progress. A goroutine dump showed the dispatcher
goroutine wedged on \`chan send (nil chan)\` at the line:
chMap[int(uid)%numGo] <- uid
uid is uint64. Casting directly to int produces a negative value for
uid >= 2^63, so int(uid)%10 can be in [-9, -1]. chMap[-3] returns the
zero value for a chan uint64, which is a nil channel; sending on a
nil channel blocks forever.
The 10 worker goroutines (also created here) were idle on
\`for uid := range uids\` since no uids ever reached them, so the
parent \`wg.Wait()\` and the surrounding errgroup never returned.
applyMutations therefore never released the txn, the alpha's old-txn
abort loop kept retrying every minute, and live-load showed
"Txns: 0 N-Quads: 0" indefinitely.
Fix: hash unsigned, then cast: \`chMap[int(uid%uint64(numGo))]\`.
Verified end-to-end with the live loader against the 1million.rdf.gz
benchmark dataset (1,041,684 n-quads, schema mixes [uid] @reverse
@count, [uid] @count, datetime @index(year), string @index(...) @lang,
geo @index(geo), string @index(exact) @upsert):
legacy : 13.85s / 14.74s (avg ~14.3s, ~77k n-quads/s)
pipeline : 9.65s / 9.36s (avg ~9.5s, ~116k n-quads/s)
That is ~1.50x faster on a realistic multi-predicate, multi-index
workload — i.e. the case the per-predicate runner pipeline is built
for.
Also adds worker/pipeline_bench_test.go: in-process Go benchmarks
comparing legacy runMutation vs newRunMutations across a matrix of
(predicates, edges-per-predicate, indexed/non-indexed) shapes. They
show the pipeline loses ~2x on tiny mutations (1-10 edges) and wins
1.2x-1.55x on bulk (10 preds x 100+ edges, indexed or not), which is
why the feature flag stays default-off and the live-loader speedup
above is the right place to evaluate this work.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
The benchmark matrix in worker/pipeline_bench_test.go showed the
pipeline loses ~2x on small mutations (≤10 edges total) and wins
~1.5x on bulk (live-loader sized: 1000 edges per txn across many
predicates). A binary on/off flag forces an all-or-nothing choice,
penalising whichever side of that crossover the workload spends most
time on.
Replace MutationsUsePipeline (bool) with MutationsPipelineThreshold
(int):
threshold = 0 -> never use the pipeline (default; legacy behavior)
threshold = 1 -> always use the pipeline (any txn with ≥1 edge)
threshold = N -> use the pipeline only when len(m.Edges) >= N
The threshold compares against total edges in the proposal. From the
benches the crossover is around 100; the live-loader 1M dataset uses
~1000 edges per txn, so anything from 100-1000 will engage the
pipeline only on bulk-shaped mutations and leave small interactive
mutations on the legacy serial path.
Wiring:
- x.WorkerConfig.MutationsPipelineThreshold (int) replaces the
bool field.
- feature-flags superflag: "mutations-pipeline-threshold=0".
- alpha/run.go reads it via featureFlagsConf.GetInt64.
- worker/draft.go applyMutations branches on
`t > 0 && len(m.Edges) >= t`.
Verified end-to-end against the live-loader benchmark
(1million.rdf.gz, official 1M schema):
threshold=0 : 13.56s, 80,129 N-Quads/s (legacy, matches baseline)
threshold=1 : 9.92s, 115,742 N-Quads/s (always-on, matches prior)
CLI usage:
dgraph alpha --feature-flags="mutations-pipeline-threshold=200"
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Engages the per-predicate mutation pipeline by default so that systest runs (and any other test suites that don't override feature-flags) exercise the pipeline path on every mutation, not the legacy serial path. Threshold of 1 means "any mutation with ≥1 edge takes the pipeline" — i.e. always on. This is a deliberate ramp toward shipping the pipeline. Operators who want to opt small interactive mutations out of the pipeline (where benches showed ~2x slowdown for ≤10-edge txns) can set a higher threshold: dgraph alpha --feature-flags="mutations-pipeline-threshold=200" To turn the pipeline fully off, set 0. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
The original branch commit 41d6445 ("fixed some bug") replaced the IsEmpty(readTs) call in IterateDisk with a hardcoded `false`, forcing every key found by the iterator to be reported as non-empty. That broke has(<predicate>) for any uid whose value had been removed via star-deletion (<uid> <pred> *): the data list still exists in badger with a DeleteAll marker on top, but the live posting list is empty at readTs — IsEmpty returns true and the uid should be skipped. Surfaced by systest TestSystestSuite/TestHasDeletedEdge in systest/mutations-and-queries: 3 nodes are created with <end> "", one is star-deleted, follow-up has(end) is expected to return 2 uids. With IsEmpty stubbed to false it returned 3. No comment was left on the original change. Restoring the call. The mutations-and-queries package is fully green with this in place (66/66 tests pass including TestHasDeletedEdge); if a real underlying issue motivated the original disable we'll chase it with a real diagnosis instead of silently dropping a safety check. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Revival of
|
| Commit | What |
|---|---|
| 436b55f8b | Resolve the merge of main into the branch (import paths hypermodeinc/dgraph → dgraph-io/dgraph, license header, two-line fix in mvcc_test.go::TestRegression9597 for the new Deltas type). |
| a2fcc09f7 | Strip ~30 unconditional fmt.Println debug calls from hot paths (posting/lists.go, index.go, list.go, mvcc.go). They fired on every read/mutation/rollup. The opt-in printTreeStats HNSW debug helper (gated by DEBUG_SHOW_HNSW_TREE) was kept. |
| 4fe27e1d4 | Remove dead commented-out code: ~175 lines of an alternate InsertTokenizerIndexes impl in posting/index.go, plus all 101 commented-out lines of BenchmarkProcessListIndex in worker/draft_test.go (referenced methods that don't exist). |
| 8fbb0bdf2 | Replace the hardcoded featureFlag := true in applyMutations with a real superflag knob mutations-use-pipeline. (Subsequently superseded by the threshold form below.) |
Phase 2 — correctness fixes (real bugs)
| Commit | What |
|---|---|
| 2d2afb190 | Document why TestCount in worker/sort_test.go is t.Skip()'d. The test bypasses Oracle conflict checking and calls CommitToDisk directly with disjoint commit ts; both legacy and new paths fail it. The skip is correct as-is — re-enable when the harness uses real txn conflicts. |
| 43b826d0a | Scalar Del-of-old-value wiping the new Set in ProcessCount. handleOldDeleteForSingle appends a synthetic Del oldVal posting next to the user's Set newVal so index/count diffing can see the prior value. ProcessCount then iterated postings and called updateMutationLayer(post, singleUidUpdate=true) on each. For non-Lang scalars fingerprintEdge returns math.MaxUint64, so both postings collide on the same Uid and the second iteration's Del unconditionally rewrote currentEntries to [DeleteAll], dropping the new value. Fix: in the !isListEdge branch, when the postings list contains a Set/Ovr, treat any Del as synthetic and skip it for the data-list update. Surfaced by a new in-process conflict-aware harness TestPipelineCountIndexConcurrent (worker/sort_test.go) that mirrors the systest TestCountIndexConcurrentSetDelScalarPredicate; pre-fix it failed ~50% of runs, post-fix stable across -count=20 and -race. |
| 792923279 | InsertTokenizerIndexes deadlock on uids ≥ 2^63. The dispatcher in posting/index.go did chMap[int(uid)%numGo] <- uid. int(uid) is signed, so for any uid in [2^63, 2^64) the modulo result is negative; chMap[-N] returns Go's zero value — a nil channel — and sending on a nil channel blocks forever. Surfaced by live load on 1million.rdf.gz: the very first batch wedged, alpha's old-txn abort loop kept retrying every minute, and the loader showed Txns: 0 N-Quads: 0 indefinitely. Two-character fix: chMap[int(uid%uint64(numGo))] <- uid. |
| 8ccdae630 | IterateDisk had IsEmpty stubbed to false (introduced by branch commit 41d6445ce1 "fixed some bug" with no comment). That forced every key the iterator found to be reported as non-empty. Broke has(<predicate>) for any uid whose value had been removed via star-deletion (<uid> <pred> *): the data list still exists in badger with a DeleteAll marker on top, but the live posting list at readTs is empty — IsEmpty returns true and the uid should be skipped. Surfaced by TestSystestSuite/TestHasDeletedEdge in systest/mutations-and-queries. Restoring the call. |
Feature flag — threshold instead of on/off
| Commit | What |
|---|---|
| c8f5fbcf3 | Replace MutationsUsePipeline bool with MutationsPipelineThreshold int. A mutation runs through the pipeline only when threshold > 0 && len(m.Edges) >= threshold; otherwise it falls back to legacy. 0 = pipeline off entirely; 1 = always on; larger values opt small interactive mutations out (benches show the pipeline pays goroutine spin-up cost on tiny txns and only wins on bulk-shaped ones). CLI: dgraph alpha --feature-flags="mutations-pipeline-threshold=200". |
| 86ad1eea9 | Flip the default to 1 so test suites exercise the pipeline path on every mutation. Reset to 0 before merging once we're confident in the systest signal. |
Benchmarks
In-process matrix (worker/pipeline_bench_test.go, included in 792923279) on Apple M4 Pro, 30 iterations:
| Config | Legacy ns/op | Pipeline ns/op | Pipeline / Legacy |
|---|---|---|---|
| 1 pred × 1 edge | 39,753 | 89,581 | 2.25× slower |
| 10 preds × 1 edge | 190,672 | 451,854 | 2.37× slower |
| 1 pred × 100 edges | 654,775 | 727,931 | 1.11× slower |
| 50 preds × 100 edges | 31,630,750 | 29,959,582 | 1.06× faster |
| 10 preds × 100 edges | 9,448,342 | 7,441,912 | 1.27× faster |
| 1 pred × 1000 edges | 5,753,151 | 4,957,482 | 1.16× faster |
| 10 preds × 1000 edges | 69,132,992 | 58,908,736 | 1.17× faster |
| 10 preds × 1000 edges, no index | 31,308,575 | 20,238,396 | 1.55× faster |
| 50 preds × 1000 edges | 403,867,619 | 375,424,165 | 1.07× faster |
Crossover is around 100 total edges. Below that, goroutine spin-up cost dominates; above it, the pipeline wins. Memory is comparable on bulk; on the smallest case the pipeline allocs ~6× more (70 KB vs 12 KB).
End-to-end live load against the 1M dataset (1million.rdf.gz, official 1M schema with [uid] @reverse @count, [uid] @count, datetime @index(year), string @index(hash, term, trigram, fulltext) @lang, geo @index(geo), string @index(exact) @upsert):
mutations-pipeline-threshold |
Avg time | Avg N-Quads/s |
|---|---|---|
0 (legacy) |
~14.3 s | ~77K |
1 (always-on pipeline) |
~9.5 s | ~116K |
~1.50× faster on a realistic bulk-shaped workload — the case the per-predicate runner was designed for.
Test results
With mutations-pipeline-threshold=1:
posting/,worker/,types/,schema/— all unit tests pass, includingTestPipelineCountIndexConcurrentunder-count=20 -race../t --suite=systest-baseline— all 13 packages green:systest,systest/acl/restore,systest/audit,systest/audit_encrypted,systest/backup/filesystem,systest/cdc,systest/cloud,systest/export,systest/group-delete,systest/loader,systest/multi-tenancy,systest/mutations-and-queries(66 tests including the previously-failingTestHasDeletedEdge),systest/plugin. Total wall clock 7m38s../t --suite=core,integration—acl(150.8s) green, all systest-baseline cached green, plus 4 large vector tests green:TestVectorIncrBackupRestore(124s),TestVectorBackupRestore(72s),TestVectorBackupRestoreDropIndex(23s),TestVectorBackupRestoreReIndexing(110s). Suite then died on a pre-existing Mac-only bug indgraphtest(commitef6f27da79"Fix running tests on mac" onmain), unrelated to the pipeline:getPortMappingsOnMacatdgraphtest/dgraph.go:435assumes everydocker psport entry has shapehost:port->container/proto, but Docker also emits bareport/tcpentries (no host mapping) which split on:to length 1 and panic at[1]. Linux CI won't hit this.
Out of scope, worth noting
- The bulk loader (
dgraph/cmd/bulk/) doesn't import any of the pipeline machinery — it has its own map-reduce path. The "speed up bulk loader" framing in the PR's lineage doesn't survive code inspection. To apply per-predicate-runner ideas to bulk you'd need to refactormapper.go's tokenization into a per-predicate goroutine pool — separate work. TestCountinworker/sort_test.gois leftt.Skip()'d with a clear reason (harness lacks Oracle conflict checking; both legacy and pipeline fail it).
Recommended deployment
- Reset the default to
0before merging. - Operators who want the speedup on bulk-only workloads can opt in with
--feature-flags="mutations-pipeline-threshold=200"(or whatever the per-mutation crossover is for their workload). - Setting
1always-on is fine for systest CI runs and is a useful "ramp-up" knob.
|
Found 5 test failures on Blacksmith runners: Failures
|
…p stale READING debug prints
Two bugs surfaced by graphql/e2e/auth's TestOrderAndOffset (cleanup
mutation \`mutation DelTask { deleteTask(filter: {}) }\` crashed the
alpha mid-request, causing the test client to see EOF on POST):
1. posting/index.go ProcessSingle SIGSEGV at line 675.
GraphQL deleteTask cleanup expands into multiple Del edges per
entity (one per predicate the entity has — uid, type, list edges,
etc.). When two Del edges to the same uid land in one transaction's
batch, the second iteration through ProcessSingle's per-edge loop
does:
pl, exists := postings[uid]
if exists {
if edge.Op == DEL {
oldVal = findSingleValueInPostingList(pl)
if string(edge.Value) == string(oldVal.Value) { ... }
^^^^^^ nil deref
}
}
findSingleValueInPostingList only returns Set postings; if the
accumulated list holds only Dels (from the prior iteration), it
returns nil and we panic dereferencing oldVal.Value.
Two fixes here:
- Guard the deref: \`if oldVal != nil && string(...) == ...\`.
- Move \`var oldVal *pb.Posting\` inside the loop. It was declared
at function scope, so a stale value from one edge could bleed
into the nil-guarded branch for a different uid on a later
iteration. Per-edge scope makes the intent explicit.
2. worker/task.go: two leftover \`fmt.Println("READING SINGLE", ...)\`
and \`fmt.Println("READING", ...)\` calls in the value-postings
read path. Same class of debug spew Phase 1B stripped from
posting/, missed because that sweep didn't include worker/.
Removed both. Safe — they were unconditional prints on every
query value read.
The graphql/e2e/auth and graphql/e2e/auth/debug_off packages now both
pass in 30s. \`./posting/\` and \`./worker/\` unit tests still green.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
No description provided.